package slatekit.samples.job

import slatekit.common.Identity
import slatekit.actors.Status
import slatekit.jobs.WResult
import slatekit.jobs.Task
import slatekit.jobs.Worker
import java.util.concurrent.atomic.AtomicInteger

// Sample User model
data class User(val id:Int, val email:String)

val NEWS_LETTER_MESSAGE = "New version coming out soon!"


// Sample list of 20 users
val allUsers = (1..20).map { User(it, "user$it@company1.com")}



/**
 * =================================================================================================
 * OPTION 1: One Time Job:
 * Use a function for a job that runs to completion: return WorkState.Done
 * =================================================================================================
 */
suspend fun sendNewsLetter(task: Task): WResult {
    allUsers.forEachIndexed { ndx, user -> JobUtils.send(task.job, NEWS_LETTER_MESSAGE, user) }
    return WResult.Done
}



/**
 * =================================================================================================
 * OPTION 2: Paged Job
 * Use a function for a job that pages through work : WorkState.Next ( see JobUtils )
 * =================================================================================================
 */
val offset1 = AtomicInteger(0)
suspend fun sendNewsLetterWithPaging(task: Task): WResult {

    // Print some info
    println("\nProcessing : ====================================")
    println("offset: " + offset1.get())

    // WResult.next(offset.get() + batchSize.toLong(), users.size.toLong(), "users")
    return JobUtils.sendNewsLetterBatch(task.job, offset1, 4)
}



/**
 * =================================================================================================
 * OPTION 3: Queued Job
 * Use a function for a job that processes a task from a queue: Process and return WorkState.More
 * =================================================================================================
 */
suspend fun sendNewsLetterFromQueue(task: Task): WResult {
    val userId = task.data.split("=")[1].toInt()
    val user = allUsers.first { it.id == userId }
    JobUtils.send(task.job, NEWS_LETTER_MESSAGE, user)

    // Print some info
    JobUtils.showInfo(task)

    // Acknowledge the task or abandon task.fail()
    task.done()

    // Indicate that this can now handle more
    return WResult.More
}



/**
 * =================================================================================================
 * OPTION 4: Worker
 * Extend from a worker to have more control over the life-cycle
 * =================================================================================================
 */
class NewsLetterWorker(id:Identity) : Worker<String>(id) {
    private val offset = AtomicInteger(0)

    // Initialization hook ( for setup / logs / alerts )
    override suspend fun started() {
        notify("initializing", listOf(("id" to this.id.name)))
    }

    // Implement your work here.
    // NOTE: If you are not using a queue, this task will be empty e.g. Task.empty
    override suspend fun work(task: Task): WResult {
        // Print some info
        JobUtils.showInfo(task)

        return JobUtils.sendNewsLetterBatch(task.job, offset, 4)
    }

    // Transition hook for when the status is changed ( e.g. from Status.Running -> Status.Paused )
    override suspend fun move(state: Status, note:String?) {
        _status.set(Pair(state, state.name))
        notify("move", listOf("status" to state.name))
    }

    // Completion hook ( for logic / logs / alerts )
    override suspend fun completed(note:String?) {
        notify("done", listOf(("id" to this.id.name)))
    }

    // Failure hook ( for logic / logs / alerts )
    override suspend fun failed(note:String?) {
        notify("failure", listOf(("id" to this.id.name), ("err" to (note ?: ""))))
    }

    // Initialization hook ( for setup / logs / alerts )
    override suspend fun notify(desc: String?, extra: List<Pair<String, String>>?) {
        val detail = extra?.joinToString(",") { it.first + "=" + it.second }
        // Simulate notification to email/alerts/etc
        println(desc + detail)
    }
}


/**
 * Utilities for this Job
 */
object JobUtils {

    fun showInfo(task: Task) {

        println("\nProcessing : ====================================")
        println("task.id  : " + task.id)    // abc123
        println("task.from: " + task.from)  // queue://notification
        println("task.job : " + task.job)   // job1
        println("task.name: " + task.name)  // users.sendNewsletter
        println("task.data: " + task.data)  // { ... } json payload
        println("task.xid : " + task.xid)   // 12345   correlation id
        println("\n")

    }

    // Sends the message ( newsletter ) to the user
    fun send(sender:String, msg:String, user: User) {
        // Simulate sending message to user
        println("job=${sender}, id=${user.id}, email=${user.email}, message=$msg")
    }


    // NOTE: This is a helper method used for the real example(s) below
    suspend fun sendNewsLetterBatch(sender:String, offset: AtomicInteger, batchSize:Int): WResult {
        val start = offset.get()
        val users = if(start < 0 || start >= allUsers.size) listOf()
        else allUsers.subList(start, start + batchSize)


        // No more records so indicate done
        if(users.isEmpty())
            return WResult.Done

        // Get next page of records
        users.forEachIndexed { ndx, user -> send(sender,"New version coming out soon!", user) }

        // Update offset and totals
        offset.addAndGet(users.size)

        // Use WorkState.Next to
        // 1. Indicate that we are paging through work
        // 2. Provide a way to stop/pause/resume processing of this task in between pages
        // 3. Provides context into diagnostics/status
        return WResult.next(offset.get() + batchSize.toLong(), users.size.toLong(), "users")
    }
}





